package ex.tableapi;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;

import static org.apache.flink.table.api.Expressions.*;

/**
 * https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/tableapi/#overview--examples
 * 该章节下第二个例子
 */
public class Ex03 extends ApiFrame {
	private String outName = "printOutTable";
	private    String createPrintOutDDL = "CREATE TABLE printOutTable (" +
			"a STRING, " +
			"bhour  TIMESTAMP(3), " +
			"avgBillingAmount  BIGINT " +
			") WITH (" +
			" 'connector' = 'print' " +
			")";

	public static void main(String[] args) {
		Ex03 demo =new Ex03();
		demo.getEnv();
		Table orders = demo.registerMysql("myorder", "orders");
	    demo.registerConsole(demo.createPrintOutDDL,demo.outName);
		
		Table result = orders
		        .filter(
		            and(
		                $("a").isNotNull(),
		                $("b").isNotNull(),
		                $("c").isNotNull()
		            ))
		        .select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
		        .window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
		        .groupBy($("hourlyWindow"), $("a"))
		        .select(
		        		$("a"),
						$("hourlyWindow").end().as("bhour"),
						$("b").avg().as("avgBillingAmount")
				);
   
	      result.printSchema();
	      result.executeInsert(demo.outName);
	   	}
}
